|
|
@@ -76,7 +76,7 @@ module Agents
|
76
|
76
|
vals = vals.merge Hash.from_xml(pr.to_xml)
|
77
|
77
|
if not_already_in_memory?(vals)
|
78
|
78
|
create_event(:payload => vals)
|
79
|
|
- add_to_memory(vals)
|
|
79
|
+ update_memory(vals)
|
80
|
80
|
else
|
81
|
81
|
end
|
82
|
82
|
end
|
|
|
@@ -85,15 +85,23 @@ module Agents
|
85
|
85
|
hydra.queue request
|
86
|
86
|
hydra.run
|
87
|
87
|
end
|
|
88
|
+ def update_memory(vals)
|
|
89
|
+ add_to_memory(vals)
|
|
90
|
+ cleanup_old_memory
|
|
91
|
+ end
|
|
92
|
+ def cleanup_old_memory
|
|
93
|
+ self.memory["existing_routes"] ||= []
|
|
94
|
+ self.memory["existing_routes"].reject!{|h| h["currentTime"] <= (Time.now - 2.hours)}
|
|
95
|
+ end
|
88
|
96
|
def add_to_memory(vals)
|
89
|
97
|
self.memory["existing_routes"] ||= []
|
90
|
98
|
self.memory["existing_routes"] << {stopTag: vals["stopTag"], tripTag: vals["prediction"]["tripTag"], epochTime: vals["prediction"]["epochTime"], currentTime: Time.now}
|
91
|
99
|
end
|
92
|
100
|
def not_already_in_memory?(vals)
|
93
|
101
|
m = self.memory["existing_routes"]
|
94
|
|
- m.select{|h| h[:stopTag] == vals["stopTag"] &&
|
95
|
|
- h[:tripTag] == vals["prediction"]["tripTag"] &&
|
96
|
|
- h[:epochTime] == vals["prediction"]["epochTime"]
|
|
102
|
+ m.select{|h| h['stopTag'] == vals["stopTag"] &&
|
|
103
|
+ h['tripTag'] == vals["prediction"]["tripTag"] &&
|
|
104
|
+ h['epochTime'] == vals["prediction"]["epochTime"]
|
97
|
105
|
}.count == 0
|
98
|
106
|
end
|
99
|
107
|
def default_options
|